package com.czk.interceptors;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * @Author:ChenZhangKun
 * @Date: 2021/5/24 17:43
 */
public class CounterInterceptor implements ProducerInterceptor<String, String> {
    /**
     * 定义成工
     */
    private int success = 0;
    /**
     * 定义失败
     */
    private int error = 0;

    /**
     * @param producerRecord
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (recordMetadata == null) {
            // 成功加一
            success++;
        } else {
            error++;
        }
    }

    @Override
    public void close() {
        // 打印
        System.out.println("成功数目=" + success);
        System.out.println("失败数=" + error);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
